1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: ParallelAlgorithms_Scan.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Concurrent
;
10 using System
.Collections
.Generic
;
12 using System
.Threading
.Tasks
;
14 namespace System
.Threading
.Algorithms
16 public static partial class ParallelAlgorithms
18 /// <summary>Computes a parallel prefix scan over the source enumerable using the specified function.</summary>
19 /// <typeparam name="T">The type of the data in the source.</typeparam>
20 /// <param name="source">The source data over which a prefix scan should be computed.</param>
21 /// <param name="function">The function to use for the scan.</param>
22 /// <returns>The results of the scan operation.</returns>
24 /// For very small functions, such as additions, an implementation targeted
25 /// at the relevant type and operation will perform significantly better than
26 /// this generalized implementation.
28 public static T
[] Scan
<T
>(IEnumerable
<T
> source
, Func
<T
, T
, T
> function
)
30 return Scan(source
, function
, loadBalance
: false);
33 /// <summary>Computes a parallel prefix scan over the source enumerable using the specified function.</summary>
34 /// <typeparam name="T">The type of the data in the source.</typeparam>
35 /// <param name="source">The source data over which a prefix scan should be computed.</param>
36 /// <param name="function">The function to use for the scan.</param>
37 /// <param name="loadBalance">Whether to load-balance during process.</param>
38 /// <returns>The results of the scan operation.</returns>
40 /// For very small functions, such as additions, an implementation targeted
41 /// at the relevant type and operation will perform significantly better than
42 /// this generalized implementation.
44 public static T
[] Scan
<T
>(IEnumerable
<T
> source
, Func
<T
, T
, T
> function
, bool loadBalance
)
47 if (source
== null) throw new ArgumentNullException("source");
50 var output
= source
.ToArray();
52 // Do the prefix scan in-place on the copy and return the results
53 ScanInPlace(output
, function
, loadBalance
);
57 /// <summary>Computes a parallel prefix scan in-place on an array using the specified function.</summary>
58 /// <typeparam name="T">The type of the data in the source.</typeparam>
59 /// <param name="data">The data over which a prefix scan should be computed. Upon exit, stores the results.</param>
60 /// <param name="function">The function to use for the scan.</param>
61 /// <returns>The results of the scan operation.</returns>
63 /// For very small functions, such as additions, an implementation targeted
64 /// at the relevant type and operation will perform significantly better than
65 /// this generalized implementation.
67 public static void ScanInPlace
<T
>(T
[] data
, Func
<T
, T
, T
> function
)
69 ScanInPlace(data
, function
, loadBalance
:false);
72 /// <summary>Computes a parallel prefix scan in-place on an array using the specified function.</summary>
73 /// <typeparam name="T">The type of the data in the source.</typeparam>
74 /// <param name="data">The data over which a prefix scan should be computed. Upon exit, stores the results.</param>
75 /// <param name="function">The function to use for the scan.</param>
76 /// <param name="loadBalance">Whether to load-balance during process.</param>
77 /// <returns>The results of the scan operation.</returns>
79 /// For very small functions, such as additions, an implementation targeted
80 /// at the relevant type and operation will perform significantly better than
81 /// this generalized implementation.
83 public static void ScanInPlace
<T
>(T
[] data
, Func
<T
, T
, T
> function
, bool loadBalance
)
86 if (data
== null) throw new ArgumentNullException("data");
87 if (function
== null) throw new ArgumentNullException("function");
89 // Do the prefix scan in-place and return the results. This implementation
90 // of parallel prefix scan ends up executing the function twice as many
91 // times as the sequential implementation. Thus, only if we have more than 2 cores
92 // will the parallel version have a chance of running faster than sequential.
93 if (Environment
.ProcessorCount
<= 2)
95 InclusiveScanInPlaceSerial(data
, function
, 0, data
.Length
, 1);
99 InclusiveScanInPlaceWithLoadBalancingParallel(data
, function
, 0, data
.Length
, 1);
101 else // parallel, non-loadbalance
103 InclusiveScanInPlaceParallel(data
, function
);
107 /// <summary>Computes a sequential prefix scan over the array using the specified function.</summary>
108 /// <typeparam name="T">The type of the data in the array.</typeparam>
109 /// <param name="arr">The data, which will be overwritten with the computed prefix scan.</param>
110 /// <param name="function">The function to use for the scan.</param>
111 /// <param name="arrStart">The start of the data in arr over which the scan is being computed.</param>
112 /// <param name="arrLength">The length of the data in arr over which the scan is being computed.</param>
113 /// <param name="skip">The inclusive distance between elements over which the scan is being computed.</param>
114 /// <remarks>No parameter validation is performed.</remarks>
115 private static void InclusiveScanInPlaceSerial
<T
>(T
[] arr
, Func
<T
, T
, T
> function
, int arrStart
, int arrLength
, int skip
)
117 for (int i
= arrStart
; i
+ skip
< arrLength
; i
+= skip
)
119 arr
[i
+ skip
] = function(arr
[i
], arr
[i
+ skip
]);
123 /// <summary>Computes a sequential exclusive prefix scan over the array using the specified function.</summary>
124 /// <param name="arr">The data, which will be overwritten with the computed prefix scan.</param>
125 /// <param name="function">The function to use for the scan.</param>
126 /// <param name="lowerBoundInclusive">The inclusive lower bound of the array at which to start the scan.</param>
127 /// <param name="upperBoundExclusive">The exclusive upper bound of the array at which to end the scan.</param>
128 public static void ExclusiveScanInPlaceSerial
<T
>(T
[] arr
, Func
<T
, T
, T
> function
, int lowerBoundInclusive
, int upperBoundExclusive
)
130 T total
= arr
[lowerBoundInclusive
];
131 arr
[lowerBoundInclusive
] = default(T
);
132 for (int i
= lowerBoundInclusive
+ 1; i
< upperBoundExclusive
; i
++)
135 total
= function(total
, arr
[i
]);
140 /// <summary>Computes a parallel prefix scan over the array using the specified function.</summary>
141 /// <typeparam name="T">The type of the data in the array.</typeparam>
142 /// <param name="arr">The data, which will be overwritten with the computed prefix scan.</param>
143 /// <param name="function">The function to use for the scan.</param>
144 /// <param name="arrStart">The start of the data in arr over which the scan is being computed.</param>
145 /// <param name="arrLength">The length of the data in arr over which the scan is being computed.</param>
146 /// <param name="skip">The inclusive distance between elements over which the scan is being computed.</param>
147 /// <remarks>No parameter validation is performed.</remarks>
148 private static void InclusiveScanInPlaceWithLoadBalancingParallel
<T
>(T
[] arr
, Func
<T
, T
, T
> function
,
149 int arrStart
, int arrLength
, int skip
)
151 // If the length is 0 or 1, just return a copy of the original array.
152 if (arrLength
<= 1) return;
153 int halfInputLength
= arrLength
/ 2;
155 // Pairwise combine. Use static partitioning, as the function
156 // is likely to be very small.
157 Parallel
.For(0, halfInputLength
, i
=>
159 int loc
= arrStart
+ (i
* 2 * skip
);
160 arr
[loc
+ skip
] = function(arr
[loc
], arr
[loc
+ skip
]);
163 // Recursively prefix scan the pairwise computations.
164 InclusiveScanInPlaceWithLoadBalancingParallel(arr
, function
, arrStart
+ skip
, halfInputLength
, skip
* 2);
166 // Generate output. As before, use static partitioning.
167 Parallel
.For(0, (arrLength
% 2) == 0 ? halfInputLength
- 1 : halfInputLength
, i
=>
169 int loc
= arrStart
+ (i
* 2 * skip
) + skip
;
170 arr
[loc
+ skip
] = function(arr
[loc
], arr
[loc
+ skip
]);
174 /// <summary>Computes a parallel inclusive prefix scan over the array using the specified function.</summary>
175 public static void InclusiveScanInPlaceParallel
<T
>(T
[] arr
, Func
<T
, T
, T
> function
)
177 int procCount
= Environment
.ProcessorCount
;
178 T
[] intermediatePartials
= new T
[procCount
];
179 using (var phaseBarrier
= new Barrier(procCount
,
180 _
=> ExclusiveScanInPlaceSerial(intermediatePartials
, function
, 0, intermediatePartials
.Length
)))
182 // Compute the size of each range
183 int rangeSize
= arr
.Length
/ procCount
;
184 int nextRangeStart
= 0;
186 // Create, store, and wait on all of the tasks
187 var tasks
= new Task
[procCount
];
188 for (int i
= 0; i
< procCount
; i
++, nextRangeStart
+= rangeSize
)
190 // Get the range for each task, then start it
192 int lowerRangeInclusive
= nextRangeStart
;
193 int upperRangeExclusive
= i
< procCount
- 1 ? nextRangeStart
+ rangeSize
: arr
.Length
;
194 tasks
[rangeNum
] = Task
.Factory
.StartNew(() =>
196 // Phase 1: Prefix scan assigned range, and copy upper bound to intermediate partials
197 InclusiveScanInPlaceSerial(arr
, function
, lowerRangeInclusive
, upperRangeExclusive
, 1);
198 intermediatePartials
[rangeNum
] = arr
[upperRangeExclusive
- 1];
200 // Phase 2: One thread only should prefix scan the intermediaries... done implicitly by the barrier
201 phaseBarrier
.SignalAndWait();
203 // Phase 3: Incorporate partials
206 for (int j
= lowerRangeInclusive
; j
< upperRangeExclusive
; j
++)
208 arr
[j
] = function(intermediatePartials
[rangeNum
], arr
[j
]);
214 // Wait for all of the tasks to complete